package connection

import (
	"github.com/gomodule/redigo/redis"
	"log"
	"src/config"
	"sync"
	"time"
	"unsafe"
)

// redis 客户端 以及消息订阅接收
var once sync.Once

// RedisClient Redis 服务
type PubRedis struct {
	Closed bool
	Client *redis.Pool
	sync.Mutex
}

func (this *PubRedis) Get() *PubRedis {
	//defer func() {
	//	if err := recover(); err != nil {
	//		fmt.Println(err) // 这里的err其实就是panic传入的内容，55
	//		this.NewRedisClient()
	//	}
	//}()
	return this.NewRedisClient()
}

func (this *PubRedis) NewRedisClient() *PubRedis {
	this = (&PubRedis{}).GetRedisPool()
	if this == nil || this.Closed {
		//1秒后重链
		time.Sleep(1 * time.Second)
		return this.NewRedisClient()
	}
	return this
}

// RedisClient2  init
func (this *PubRedis) GetRedisPool() *PubRedis {
	address := config.V().GetString("subscribe.redis.conf.ip") + ":" + config.V().GetString("subscribe.redis.conf.port")
	password := config.V().GetString("subscribe.redis.conf.password")
	db := config.V().GetInt("subscribe.redis.conf.db")
	MaxIdle := config.V().GetInt("subscribe.redis.conf.maxIdle")
	Active := config.V().GetInt("subscribe.redis.conf.active")
	Auth := config.V().GetBool("subscribe.redis.conf.auth")
	IdleTimeout := config.V().GetInt("subscribe.redis.conf.idleTimeout")
	this.Client = &redis.Pool{
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			_, err := c.Do("PING")
			return err
		},
		MaxIdle:     MaxIdle, //最初的连接数量
		MaxActive:   Active,  //最大连接数 0自动 按需分配
		IdleTimeout: time.Duration(IdleTimeout) * time.Second,
		Wait:        true, //超过连接数后是否等待
		Dial: func() (redis.Conn, error) {
			redisUri := address
			if Auth {
				redisCon, err := redis.Dial("tcp", redisUri,
					redis.DialPassword(password), redis.DialDatabase(db))
				if err != nil {
					log.Println("redisConn2 链接失败", err)
				}
				return redisCon, nil
			} else {
				redisCon, err := redis.Dial("tcp", redisUri, redis.DialDatabase(db))
				if err != nil {
					log.Println("redisConn3 链接失败", err)
				}
				return redisCon, nil
			}
		},
	}
	return this
}

// Ping 用以测试 subscribe 连接是否正常
func (rds *PubRedis) Ping() error {
	defer rds.Close()
	_, err := rds.Client.Get().Do("ping")
	return err
}

func (rds *PubRedis) SendPublish(channelName string, msg any) error {
	conn := rds.Get().Client.Get()
	defer rds.Close()
	err := conn.Send("Publish", channelName, msg)
	if err != nil {
		return err
	}
	err = conn.Flush()
	if err != nil {
		return err
	}
	return nil
}
func (rds *PubRedis) DoPublish(channelName string, msg any) (any, error) {
	conn := rds.Client.Get()
	defer rds.Close()
	result, err := conn.Do("Publish", channelName, msg)
	if err != nil {
		return "", err
	}
	return result, nil
}
func (rds *PubRedis) Close() {
	rds.Closed = true
	defer func(Client *redis.Pool) {
		err := Client.Close()
		if err != nil {

		}
	}(rds.Client)
	return
}

// 消息订阅
func (rds *PubRedis) PubSubConn(channelName string, callback func(channel string, byte2 []byte)) {
	defer func() {
		// 失败1秒后重新链
		time.Sleep(1 * time.Second)
		if err := recover(); err != nil {
			log.Println(err, "redis订阅，失败重连") // 这里的err其实就是panic传入的内容，55
			rds.PubSubConn(channelName, callback)
		}
	}()
	conn := rds.Get().Client.Get()
	defer rds.Close()
	//conn := GetRedisPool().Get()
	//defer conn.Close()
	subc := redis.PubSubConn{conn}
	// 订阅channel1这个channel
	err := subc.Subscribe(channelName)
	if err != nil {
		//log.Println("subscribe Subscribe error.")
		panic(err)
		return
	}
	//c.cbMap[channel] = cb

	for {
		//log.Println("wait...")
		rec := subc.Receive()
		if rec == nil {
			panic("读取失败")
			return
		}
		switch res := rec.(type) {
		case redis.Message:
			channel := (*string)(unsafe.Pointer(&res.Channel))
			message := (*[]byte)(unsafe.Pointer(&res.Data))
			callback(*channel, *message)
		case redis.Subscription:
			//fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
		case error:
			//log.Println("断线 error handle...")
			//time.Sleep(1 * time.Second)
			panic(res.Error())
			//continue
		}
	}
	return
}
